{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "cb3cf1fc",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Execute this cell to install dependencies\n",
    "%pip install sf-hamilton[sdk,visualization] falkordb openai"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "4682d46e",
   "metadata": {},
   "source": [
    "# Ingestion Notebook [![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/dagworks-inc/hamilton/blob/main/examples/LLM_Workflows/GraphRAG/ingest_notebook.ipynb) [![GitHub badge](https://img.shields.io/badge/github-view_source-2b3137?logo=github)](https://github.com/apache/hamilton/blob/main/examples/LLM_Workflows/GraphRAG/ingest_notebook.ipynb)\n",
    "\n",
    "\n",
    "In this notebook we see how to load data into FalkorDB using [Hamilton](https://github.com/apache/hamilton)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "id": "9c81ff55",
   "metadata": {
    "ExecuteTime": {
     "end_time": "2024-05-26T05:05:26.275402Z",
     "start_time": "2024-05-26T05:05:25.579866Z"
    }
   },
   "outputs": [],
   "source": [
    "# load jupyter magic to help display Hamilton in a notebook\n",
    "%load_ext hamilton.plugins.jupyter_magic"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "ee990a80",
   "metadata": {},
   "source": [
    "## Fighter ingestion pipeline\n",
    "\n",
    "We define loading a CSV, then loading each row/fighter into FalkorDB.\n",
    "\n",
    "We use Hamilton to parallelize & iterate over each fighter to keep our code more unit testable and manageable. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "id": "cd97754d",
   "metadata": {
    "ExecuteTime": {
     "end_time": "2024-05-26T05:12:04.660817Z",
     "start_time": "2024-05-26T05:12:04.295564Z"
    }
   },
   "outputs": [],
   "source": [
    "%%cell_to_module -m load_fighters --display\n",
    "\n",
    "import pandas as pd\n",
    "from hamilton.htypes import Parallelizable, Collect\n",
    "import falkordb\n",
    "import utils\n",
    "\n",
    "def raw_fighter_details() -> pd.DataFrame:\n",
    "    \"\"\"Load fighter data\"\"\"\n",
    "    _df = pd.read_csv('../data/raw_fighter_details.csv')\n",
    "    return _df\n",
    "\n",
    "def fighter(raw_fighter_details: pd.DataFrame) -> Parallelizable[pd.Series]:\n",
    "    \"\"\"We then want to do something for each record. That's what this code sets up\"\"\"\n",
    "    for idx, row in raw_fighter_details.iterrows():\n",
    "        yield row\n",
    "\n",
    "def record(fighter: pd.Series) -> dict:\n",
    "    \"\"\"Given a single fighter record, process it into a dictionary.\"\"\"\n",
    "    attrs = {}\n",
    "    attrs['Name'] = fighter.fighter_name\n",
    "    \n",
    "    if isinstance(fighter.Height, str) and fighter.Height != \"\":\n",
    "        attrs['Height'] = utils.height_to_cm(fighter.Height)\n",
    "    \n",
    "    if isinstance(fighter.Weight, str) and fighter.Weight != \"\":\n",
    "        Weight = int(fighter.Weight.replace(' lbs.', ''))\n",
    "        attrs['Weight'] = Weight\n",
    "    \n",
    "    if isinstance(fighter.Reach, str) and fighter.Reach != \"\":\n",
    "        attrs['Reach'] = utils.reach_to_cm(fighter.Reach)\n",
    "    \n",
    "    if isinstance(fighter.Stance, str) and fighter.Stance != \"\":\n",
    "        attrs['Stance'] = fighter.Stance\n",
    "    \n",
    "    if isinstance(fighter.DOB, str) and fighter.DOB != \"\":\n",
    "        attrs['DOB'] = utils.date_to_timestamp(fighter.DOB)\n",
    "    \n",
    "    # Significant Strikes Landed per Minute\n",
    "    attrs['SLpM'] = float(fighter.SLpM)\n",
    "    \n",
    "    # Strike accuracy\n",
    "    attrs['Str_Acc'] = utils.percentage_to_float(fighter.Str_Acc)\n",
    "    \n",
    "    # Significant Strikes Absorbed per Minute.\n",
    "    attrs['SApM'] = float(fighter.SApM)\n",
    "    \n",
    "    # strikes defended\n",
    "    attrs['Str_Def'] = utils.percentage_to_float(fighter.Str_Def)\n",
    "    \n",
    "    # Takedown average\n",
    "    attrs['TD_Avg'] = float(fighter.TD_Avg)\n",
    "    \n",
    "    # Takedown accuracy\n",
    "    attrs['TD_Acc'] = utils.percentage_to_float(fighter.TD_Acc)\n",
    "    \n",
    "    # Takedown defense\n",
    "    attrs['TD_Def'] = utils.percentage_to_float(fighter.TD_Def)\n",
    "    \n",
    "    # Submission average\n",
    "    attrs['Sub_Avg'] = float(fighter.Sub_Avg)\n",
    "    return attrs\n",
    "\n",
    "def write_to_graph(record: Collect[dict], graph: falkordb.Graph) -> int:\n",
    "    \"\"\"Take all records and then push to the DB\"\"\"\n",
    "    records = list(record)\n",
    "    # Load all fighters in one go.\n",
    "    q = \"UNWIND $fighters as fighter CREATE (f:Fighter) SET f = fighter\"\n",
    "    graph.query(q, {'fighters': records})\n",
    "    return len(records)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "f56e3fa6",
   "metadata": {},
   "source": [
    "# Fights ingestion pipeline\n",
    "\n",
    "This code then takes the fight data and creates the graph structure required by\n",
    "FalkorDB to link things together.\n",
    "\n",
    "It then uses the Hamilton construct to iterate over each fight and push that individually\n",
    "to FalkorDB."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "id": "c1ae988c",
   "metadata": {
    "ExecuteTime": {
     "end_time": "2024-05-26T05:44:01.899490Z",
     "start_time": "2024-05-26T05:44:01.430713Z"
    }
   },
   "outputs": [],
   "source": [
    "%%cell_to_module -m load_fights --display\n",
    "\n",
    "import pandas as pd\n",
    "import utils\n",
    "from hamilton.htypes import Parallelizable, Collect\n",
    "import falkordb\n",
    "\n",
    "def raw_total_fight_data() -> pd.DataFrame:\n",
    "    \"\"\"Loads the raw fight data\"\"\"\n",
    "    _df = pd.read_csv('../data/raw_total_fight_data.csv', delimiter=\";\")\n",
    "    return _df\n",
    "\n",
    "def transformed_data(raw_total_fight_data: pd.DataFrame) -> pd.DataFrame:\n",
    "    \"\"\"Does some light transformation on the data and adds some columns\"\"\"\n",
    "    _df = raw_total_fight_data\n",
    "    _df.last_round = _df.last_round.astype(int)\n",
    "    _df.last_round_time = _df.last_round_time.apply(lambda x: utils.time_to_seconds(x))\n",
    "    _df.date = _df.date.apply(lambda x: utils.date_to_timestamp(x))\n",
    "    _df[\"Loser\"] = _df.apply(\n",
    "        lambda x: x[\"B_fighter\"] if x[\"Winner\"] == x[\"R_fighter\"] else x[\"R_fighter\"], axis=1)\n",
    "    return _df\n",
    "\n",
    "\n",
    "def columns_of_interest() -> list[str]:\n",
    "    \"\"\"Returns the columns that we're interested in processing\"\"\"\n",
    "    return [\n",
    "        \"R_fighter\", \"B_fighter\", \n",
    "        #    R_KD, B_KD, R_SIG_STR.,B_SIG_STR.,\n",
    "        # R_SIG_STR_pct, B_SIG_STR_pct, R_TOTAL_STR., B_TOTAL_STR.,\n",
    "        # R_TD, B_TD, R_TD_pct, B_TD_pct, R_SUB_ATT, B_SUB_ATT,\n",
    "        # R_REV, B_REV, R_CTRL, B_CTRL, R_HEAD, B_HEAD, R_BODY,\n",
    "        # B_BODY, R_LEG, B_LEG, R_DISTANCE, B_DISTANCE, R_CLINCH,\n",
    "        # B_CLINCH, R_GROUND, B_GROUND, win_by, \n",
    "        \"last_round\",\n",
    "        \"last_round_time\", \"Format\", \"Referee\", \"date\", \"location\",\n",
    "        \"Fight_type\", \"Winner\", \"Loser\"\n",
    "    ]\n",
    "\n",
    "\n",
    "def fight(transformed_data: pd.DataFrame, \n",
    "        columns_of_interest: list[str],\n",
    "        ) -> Parallelizable[pd.Series]:\n",
    "    \"\"\"Enables us to process each fight. We pass in a client along with each row\"\"\"\n",
    "    for _idx, _row in transformed_data[columns_of_interest].iterrows():\n",
    "        yield _row\n",
    "\n",
    "def write_to_graph(fight: pd.Series, graph:  falkordb.Graph) -> str:\n",
    "    _row, _graph = fight, graph\n",
    "    # create referee\n",
    "    q = \"MERGE (:Referee {Name: $name})\"\n",
    "    _graph.query(q, {'name': _row.Referee if isinstance(_row.Referee, str) else \"\"})\n",
    "\n",
    "    # create card\n",
    "    q = \"MERGE (c:Card {Date: $date, Location: $location})\"\n",
    "    _graph.query(q, {'date': _row.date, 'location': _row.location})\n",
    "\n",
    "    # create fight\n",
    "    q = \"\"\"MATCH (c:Card {Date: $date, Location: $location})\n",
    "           MATCH (ref:Referee {Name: $referee})\n",
    "           MATCH (r:Fighter {Name:$R_fighter})\n",
    "           MATCH (b:Fighter {Name:$B_fighter})\n",
    "           CREATE (f:Fight)-[:PART_OF]->(c)\n",
    "           SET f = $fight\n",
    "           CREATE (f)-[:RED]->(r)\n",
    "           CREATE (f)-[:BLUE]->(b)\n",
    "           CREATE (ref)-[:REFEREED]->(f)\n",
    "           RETURN ID(f)\n",
    "        \"\"\"\n",
    "    f_id = _graph.query(q, {'date': _row.date, 'location': _row.location,\n",
    "        'referee':  _row.Referee if isinstance(_row.Referee, str) else \"\", 'R_fighter': _row.R_fighter,\n",
    "        'B_fighter': _row.B_fighter, 'fight': {'Last_round': _row.last_round,\n",
    "            'Last_round_time': _row.last_round_time, 'Format': _row.Format,\n",
    "            'Fight_type': _row.Fight_type}\n",
    "        }).result_set[0][0]\n",
    "    \n",
    "    # mark winner & loser\n",
    "    q = \"\"\"MATCH (f:Fight) WHERE ID(f) = $fight_id\n",
    "           MATCH (l:Fighter {Name:$loser})\n",
    "           MATCH (w:Fighter {Name:$winner})\n",
    "           CREATE (w)-[:WON]->(f), (l)-[:LOST]->(f)\n",
    "        \"\"\"\n",
    "    _graph.query(q, {'fight_id': f_id, 'loser': _row.Loser, 'winner': _row.Winner if isinstance(_row.Winner, str) else \"\"})\n",
    "    return \"success\"\n",
    "\n",
    "def collect_writes(write_to_graph: Collect[str]) -> int:\n",
    "    return len(list(write_to_graph))"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "8b1a2966",
   "metadata": {},
   "source": [
    "# Exercise the pipelines\n",
    "\n",
    "We set up the requisite objects and clients to run the pipelines."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "id": "9c7fdd81",
   "metadata": {
    "ExecuteTime": {
     "end_time": "2024-05-26T05:44:05.459158Z",
     "start_time": "2024-05-26T05:44:05.448476Z"
    }
   },
   "outputs": [],
   "source": [
    "import falkordb\n",
    "from hamilton import driver\n",
    "from hamilton.execution import executors\n",
    "\n",
    "db = falkordb.FalkorDB(host='localhost', port=6379)\n",
    "g  = db.select_graph(\"UFC\")\n",
    "\n",
    "# Clear previous graph\n",
    "if \"UFC\" in db.list_graphs():\n",
    "    g.delete()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "0836b087",
   "metadata": {},
   "source": [
    "### Note about the Hamilton UI\n",
    "Hamilton comes with an [open source UI](https://hamilton.apache.org/hamilton-ui/) that you can\n",
    "surface information about your Hamilton executions. If you pull the docker containers\n",
    "locally and uncomment adding the HamiltonTracker, then you'll see runs logged to it."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "id": "f5443a7f",
   "metadata": {
    "ExecuteTime": {
     "end_time": "2024-05-26T05:44:33.832688Z",
     "start_time": "2024-05-26T05:44:06.038046Z"
    }
   },
   "outputs": [],
   "source": [
    "# if you have the Hamilton UI you can see progress:\n",
    "# from hamilton_sdk import adapters\n",
    "# tracker = adapters.HamiltonTracker(\n",
    "#    project_id=44,  # modify this as needed\n",
    "#    username=\"elijah@dagworks.io\",\n",
    "#    dag_name=\"load_fighters\",\n",
    "#    tags={\"environment\": \"DEV\", \"team\": \"MY_TEAM\", \"version\": \"X\"}\n",
    "# )\n",
    "\n",
    "\n",
    "# build the hamilton Driver\n",
    "fighter_loader = (\n",
    "    driver.Builder()\n",
    "    .with_modules(load_fighters)\n",
    "    .enable_dynamic_execution(allow_experimental_mode=True)\n",
    "    .with_remote_executor(executors.MultiThreadingExecutor(5))\n",
    "    # .with_adapters(tracker)   # <--- uncomment this to add tracker.\n",
    "    .build()\n",
    ")\n",
    "\n",
    "fighter_loader.execute([\"write_to_graph\"], inputs={\"graph\": g})"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "id": "3107f89b",
   "metadata": {
    "ExecuteTime": {
     "end_time": "2024-05-26T05:49:42.579219Z",
     "start_time": "2024-05-26T05:44:33.835479Z"
    }
   },
   "outputs": [],
   "source": [
    "# if you have the Hamilton UI you can see progress:\n",
    "# from hamilton_sdk import adapters\n",
    "# tracker = adapters.HamiltonTracker(\n",
    "#    project_id=44,  # modify this as needed\n",
    "#    username=\"elijah@dagworks.io\",\n",
    "#    dag_name=\"load_fights\",\n",
    "#    tags={\"environment\": \"DEV\", \"team\": \"MY_TEAM\", \"version\": \"X\"}\n",
    "# )\n",
    "\n",
    "fights_loader = (\n",
    "    driver.Builder()\n",
    "    .with_modules(load_fights)\n",
    "    .enable_dynamic_execution(allow_experimental_mode=True)\n",
    "    .with_remote_executor(executors.MultiThreadingExecutor(5))  # this will do 5 concurrent inserts\n",
    "    # .with_adapters(tracker)   # <--- uncomment this to add tracker.\n",
    "    .build()\n",
    ")\n",
    "fights_loader.execute([\"collect_writes\"], inputs={\"graph\": g})"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "b0a0d9fe",
   "metadata": {
    "ExecuteTime": {
     "end_time": "2024-05-26T05:49:42.591409Z",
     "start_time": "2024-05-26T05:49:42.580724Z"
    }
   },
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "4c83c11d",
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.10.13"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
